Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-9987: optimize sticky assignment algorithm for same-subscription case #8668

Conversation

ableegoldman
Copy link
Contributor

@ableegoldman ableegoldman commented May 14, 2020

Motivation and pseudo code algorithm in the ticket.

Added a scale test with large number of topic partitions and consumers and 30s timeout.
With these changes, assignment with 2,000 consumers and 200 topics with 2,000 each completes within a few seconds.

Porting the same test to trunk, it took 2 minutes even with a 100x reduction in the number of topics (ie, 2 minutes for 2,000 consumers and 2 topics with 2,000 partitions)

Should be cherry-picked to 2.6, 2.5, and 2.4

@@ -303,79 +469,17 @@ private int getBalanceScore(Map<String, List<TopicPartition>> assignment) {
Map<String, List<TopicPartition>> consumer2AllPotentialPartitions) {
List<TopicPartition> sortedPartitions = new ArrayList<>();

if (!isFreshAssignment && areSubscriptionsIdentical(partition2AllPotentialConsumers, consumer2AllPotentialPartitions)) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove all this since we checked for identical subscriptions at the start, so we know that they are not

@ableegoldman ableegoldman force-pushed the 9987-efficient-CooperativeStickyAssignor branch from 635f8f0 to afbed93 Compare May 29, 2020 01:00
@@ -169,10 +169,10 @@ public void testAssignmentWithConflictingPreviousGenerations() {
TopicPartition tp5 = new TopicPartition(topic, 5);

List<TopicPartition> c1partitions0 = partitions(tp0, tp1, tp4);
List<TopicPartition> c2partitions0 = partitions(tp0, tp2, tp3);
List<TopicPartition> c2partitions0 = partitions(tp0, tp1, tp2);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test was testing an illegal state to begin with: you should never have two consumers in the same generation claim to own the same partition. That fact is the entire reason for the generation field to be added to the StickyAssignor's subscription userdata to begin with.

@@ -582,35 +578,6 @@ public void testNoExceptionThrownWhenOnlySubscribedTopicDeleted() {
assertTrue(assignment.get(consumerId).isEmpty());
}

@Test
public void testConflictingPreviousAssignments() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comment above: this test was starting from an illegal state. Also, it doesn't make sense to place this in the AbstractStickyAssignorTest as the cooperative assignor can't have conflicting previous assignments. If a member thinks it still owns a partition that now belongs to another member, it will have to invoke onPartitionsLost before rejoining the group

@@ -425,8 +422,36 @@ public void testSameSubscriptions() {
assertTrue(assignor.isSticky());
}

@Test(timeout = 30 * 1000)
public void testLargeAssignmentAndGroupWithUniformSubscription() {
int topicCount = 200;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On trunk, this test fails (hits the 30s timeout) even when you reduce the number of topics to just 1

@ableegoldman
Copy link
Contributor Author

call for review any of @guozhangwang @hachikuji @vvcephei

@guozhangwang
Copy link
Contributor

test this please

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Just one nit comment.

// other generations are, consider it as having lost its owned partition
if (!memberData.generation.isPresent() && maxGeneration > 0
|| memberData.generation.isPresent() && memberData.generation.get() < maxGeneration) {
consumerToOwnedPartitions.put(consumer, new ArrayList<>());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: to be consistent, we can just add consumer to membersWithOldGeneration and then let them to be cleared at the end.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, it seems odd to clear it at the end since it's definitely already empty. Note, we're not overwriting the current partitions with an empty array, we're just initializing the assignment for this consumer. I'll add a comment though

for (String consumer : unfilledMembers) {
List<TopicPartition> consumerAssignment = assignment.get(consumer);
int remainingCapacity = minQuota - consumerAssignment.size();
while (remainingCapacity > 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible that this unfilled consumer has N+1 remaining capacity, while there's only N max consumer only?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NVM, I realized it should never happen.


// Keep track of the partitions being migrated from one consumer to another during assignment
// so the cooperative assignor can adjust the assignment
protected Map<TopicPartition, String> partitionsTransferringOwnership = new HashMap<>();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just an optimization for the cooperative case: I found that the assignment time for the eager and cooperative assignor began to diverge once you reached partition counts in the millions. At 10 million partitions for example, the eager assignor hovered around 30s but the cooperative assignor was upwards of 5-6 minutes.
The discrepancy was entirely due to the adjustAssignment method needing to compute the set of partitions transferring ownership in the completed assignment. But we can build up this map during assignment much more efficiently, by taking advantage of the additional context we have at various steps in the algorithm. Tracking and exposing this set to the cooperative assignor cut the assignment time for large partition numbers pretty drastically, putting the cooperative assignor on par with the eager assignor.

} else {
log.debug("Detected that all not consumers were subscribed to same set of topics, falling back to the "
+ "general case assignment algorithm");
partitionsTransferringOwnership = null;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't bother to include this optimization for the general case. We know that the assignment algorithm itself becomes a bottleneck at only 2,000 partitions, so there's no point optimizing something that only becomes a bottleneck in the millions of partitions

Comment on lines -108 to -110
partitionsPerTopic.put(topic, 3);
partitionsPerTopic.put(otherTopic, 3);
subscriptions = Collections.singletonMap(consumerId, new Subscription(topics(topic)));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test was also starting with an illegal state -- partitionsPerTopic only contains metadata for topics included in the subscription. I noticed that we don't seem to be testing the actual valid case, where some consumers have ownedPartitions which are no longer in the subscription, so I just adapted this test for the related purpose

@ableegoldman
Copy link
Contributor Author

@guozhangwang made a few more changes, ready for another review

@twmb
Copy link

twmb commented Jun 1, 2020

From what I can tell, this looks good to me. This loses one mostly insignificant "optimization" that does not really affect anything in reality: prior, if an old-generation member is rejoining, the code would try to re-sticky partitions to those old members for any partitions that are now on overloaded members or are unassigned. This is a pretty minor optimization though, and deleting this logic entirely from my own balancer breaks no tests.

This algorithm primarily differs from mine by doing a bunch of up front checking work, and then doing a "single" pass that performs all assignments. Mine does a bunch of assigning while doing checks, and then does a small balancing pass. Both of these options are great, though!

Pretty nifty observation about building partitionsTransferringOwnership while doing assignment. I'm going to have to figure out if that's even possible with my approach--your algorithm can do that because of its one pass, whereas mine loses some context of who started with what by the time it gets to balancing.

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM on the new optimization.

@ableegoldman
Copy link
Contributor Author

@twmb yeah, I should point out in the ticket that this approach drops the optimization giving preference to older-generation owners of a partition. I actually don't think it would be particularly difficult to incorporate into this new algorithm, but my take was that it still adds more complexity than any benefit it provides.
We actually dropped this implicitly in the cooperative assignor, since a member with an older generation will have to give up all of its owned partitions before rejoining the group anyway.

It was nice to be able to build up the partitionsTransferringOwnership with the additional context we have while crafting the assignment, but to be fair it may be somewhat of an over-optimization at this point. The adjustAssignment loop that was building it up from scratch still performed fine up to ~5-10 million partitions. But I figure, better to optimize now and not have to worry about it later 🙂

@guozhangwang guozhangwang merged commit c6633a1 into apache:trunk Jun 1, 2020
guozhangwang pushed a commit that referenced this pull request Jun 1, 2020
…n case (#8668)

Motivation and pseudo code algorithm in the ticket.

Added a scale test with large number of topic partitions and consumers and 30s timeout.
With these changes, assignment with 2,000 consumers and 200 topics with 2,000 each completes within a few seconds.

Porting the same test to trunk, it took 2 minutes even with a 100x reduction in the number of topics (ie, 2 minutes for 2,000 consumers and 2 topics with 2,000 partitions)

Should be cherry-picked to 2.6, 2.5, and 2.4

Reviewers: Guozhang Wang <wangguoz@gmail.com>
guozhangwang pushed a commit that referenced this pull request Jun 1, 2020
…n case (#8668)

Motivation and pseudo code algorithm in the ticket.

Added a scale test with large number of topic partitions and consumers and 30s timeout.
With these changes, assignment with 2,000 consumers and 200 topics with 2,000 each completes within a few seconds.

Porting the same test to trunk, it took 2 minutes even with a 100x reduction in the number of topics (ie, 2 minutes for 2,000 consumers and 2 topics with 2,000 partitions)

Should be cherry-picked to 2.6, 2.5, and 2.4

Reviewers: Guozhang Wang <wangguoz@gmail.com>
guozhangwang pushed a commit that referenced this pull request Jun 1, 2020
…n case (#8668)

Motivation and pseudo code algorithm in the ticket.

Added a scale test with large number of topic partitions and consumers and 30s timeout.
With these changes, assignment with 2,000 consumers and 200 topics with 2,000 each completes within a few seconds.

Porting the same test to trunk, it took 2 minutes even with a 100x reduction in the number of topics (ie, 2 minutes for 2,000 consumers and 2 topics with 2,000 partitions)

Should be cherry-picked to 2.6, 2.5, and 2.4

Reviewers: Guozhang Wang <wangguoz@gmail.com>
@guozhangwang
Copy link
Contributor

Cherry-picked to 2.6/2.5/2.4.

@ijuma
Copy link
Contributor

ijuma commented Jun 2, 2020

Did we check the build before merging this? It seems to have broken it:
#8779

@ijuma
Copy link
Contributor

ijuma commented Jun 2, 2020

@guozhangwang Looks like 2.6, 2.5 and 2.4 are broken too. You should generally also build locally when cherry-picking.

@ableegoldman
Copy link
Contributor Author

Sorry @ijuma, I think I only ever ran the local tests + checkstyle, not the full suite. My mistake

// If the current member's generation is higher, all the previous owned partitions are invalid
if (memberData.generation.isPresent() && memberData.generation.get() > maxGeneration) {
membersWithOldGeneration.addAll(membersOfCurrentHighestGeneration);
membersOfCurrentHighestGeneration.clear();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just FYI, I introduced this bug right before merging. Luckily the tests caught it -- fix is #8777

@ijuma
Copy link
Contributor

ijuma commented Jun 2, 2020

@ableegoldman No worries, I do the same. We just need to check the PR result before merging. Additionally, committers should run checkstyle and spotBugs when cherry-picking to older branches.

ijuma added a commit to confluentinc/kafka that referenced this pull request Jun 3, 2020
* apache-github/2.6: (32 commits)
  KAFKA-10083: fix failed testReassignmentWithRandomSubscriptionsAndChanges tests (apache#8786)
  KAFKA-9945: TopicCommand should support --if-exists and --if-not-exists when --bootstrap-server is used (apache#8737)
  KAFKA-9320: Enable TLSv1.3 by default (KIP-573) (apache#8695)
  KAFKA-10082: Fix the failed testMultiConsumerStickyAssignment (apache#8777)
  MINOR: Remove unused variable to fix spotBugs failure (apache#8779)
  MINOR: ChangelogReader should poll for duration 0 for standby restore (apache#8773)
  KAFKA-10030: Allow fetching a key from a single partition (apache#8706)
  Kafka-10064 Add documentation for KIP-571 (apache#8760)
  MINOR: Code cleanup and assertion message fixes in Connect integration tests (apache#8750)
  KAFKA-9987: optimize sticky assignment algorithm for same-subscription case (apache#8668)
  KAFKA-9392; Clarify deleteAcls javadoc and add test for create/delete timing (apache#7956)
  KAFKA-10074: Improve performance of `matchingAcls` (apache#8769)
  KAFKA-9494; Include additional metadata information in DescribeConfig response (KIP-569) (apache#8723)
  KAFKA-10056; Ensure consumer metadata contains new topics on subscription change (apache#8739)
  KAFKA-10029; Don't update completedReceives when channels are closed to avoid ConcurrentModificationException (apache#8705)
  KAFKA-10061; Fix flaky `ReassignPartitionsIntegrationTest.testCancellation` (apache#8749)
  KAFKA-9130; KIP-518 Allow listing consumer groups per state (apache#8238)
  KAFKA-9501: convert between active and standby without closing stores (apache#8248)
  MINOR: Relax Percentiles test (apache#8748)
  MINOR: regression test for task assignor config (apache#8743)
  ...
@ableegoldman ableegoldman deleted the 9987-efficient-CooperativeStickyAssignor branch June 26, 2020 22:37
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants